#!/usr/bin/r
library(foreach)
#library(RSQLite)
library(doParallel)
library(parallel)
library(RMySQL)
library(int64)
source("lib_gps.r")
cores<-max(c(1,detectCores()-1),na.rm=TRUE)
if(is.infinite(cores))cores<-1
registerDoParallel(cores)
cl<-makeCluster(cores)

con = dbConnect(MySQL(), user='gps', password='gps', dbname='gps', host='localhost')

dbSendStatement(con,"update gpsin set raw=1;")

rmcin<-dbGetQuery(con,"select uid,cast(id as char) as id,cast(epoch as char) as epoch,telegram,payload from gpsin where telegram like '%RMC' and raw=1;")
if(nrow(rmcin)>0){
	rmcin$id<-as.int64(rmcin$id)
	rmcin$epoch<-as.int64(rmcin$epoch)

	sid<-strsplit(rmcin$payload,",")
	maxcol<-max(sapply(sid,length))
	clusterExport(cl,c("maxcol"))
	inrmc<-as.data.frame(matrix(unlist(lapply(sid,function(i){out<-i;if(length(i)<maxcol){out<-c(i,rep(NA,maxcol-length(i)));};return(out)})),ncol=maxcol,nrow=length(sid),byrow=TRUE),stringsAsFactors=FALSE)
	inrmc$id<-rmcin$id
	inrmc$uid<-rmcin$uid
	inrmc$epoch<-rmcin$epoch
	inrmc$telegram<-rmcin$telegram
	inrmc<-inrmc[!is.na(inrmc[,9])&!is.na(inrmc[,1]),]
	clusterExport(cl,c("inrmc"))
	rmcdf<-data.frame(uid=inrmc$uid,epoch=inrmc$epoch,id=inrmc$id,
		breite=txt2bl(inrmc[,3]),laenge=txt2bl(inrmc[,5]),velocity=round(as.numeric(inrmc[,7])*1000),
		fix=as.numeric(inrmc[,1]),system=substr(inrmc$telegram,2,2),
		fix_epoch=parSapply(cl,1:nrow(inrmc),function(i){
			rmct<-inrmc[i,1]
			if(nchar(rmct)<6)rmct<-paste0(paste0(rep(0,6-nchar(rmct)),collapse=""),rmct)
			rmcd<-inrmc[i,9]
			if(nchar(rmcd)<6)rmcd<-paste0(paste0(rep(0,6-nchar(rmcd)),collapse=""),rmcd)
			return(as.numeric(strptime(paste(rmcd,rmct,sep=""),format="%d%m%y%H%M%S",tz="GMT")))
		})
	,stringsAsFactors=FALSE)
	rmcdf<-rmcdf[(rmcdf$breite<=5400000000)&(rmcdf$laenge<=10800000000),]
	dbCatchWriteTable(con,"gpsrmc",rmcdf,nfr=1000)
	rm(rmcdf,inrmc,rmcin)
	gc()
}
rmcin<-dbGetQuery(con,"select uid,cast(id as char) as id,cast(epoch as char) as epoch,telegram,payload from gpsin where telegram like '%GGA' and raw=1;")
if(nrow(rmcin)>0){
	rmcin$id<-as.int64(rmcin$id)
	rmcin$epoch<-as.int64(rmcin$epoch)

	sid<-strsplit(rmcin$payload,",")
	maxcol<-max(sapply(sid,length))
	clusterExport(cl,c("maxcol"))
	inrmc<-as.data.frame(matrix(unlist(lapply(sid,function(i){out<-i;if(length(i)<maxcol){out<-c(i,rep(NA,maxcol-length(i)));};return(out)})),ncol=maxcol,nrow=length(sid),byrow=TRUE),stringsAsFactors=FALSE)
	inrmc$id<-rmcin$id
	inrmc$uid<-rmcin$uid
	inrmc$epoch<-rmcin$epoch
	inrmc$telegram<-rmcin$telegram

	inrmc<-inrmc[(inrmc[,10]=="M")&(inrmc[,3]%in%c("N","S"))&(inrmc[,5]%in%c("E","W"))&!is.na(inrmc[,1])&!is.na(inrmc[,6]),]
	if(nrow(inrmc)>0){
		clusterExport(cl,c("inrmc"))
		ggadf<-data.frame(uid=inrmc$uid,epoch=inrmc$epoch,id=inrmc$id,
			fix=as.numeric(inrmc[,1]),breite=txt2bl(inrmc[,2]),laenge=txt2bl(inrmc[,4]),
			sats=as.numeric(inrmc[,7]),hdop=as.numeric(inrmc[,8]),hoehe=as.numeric(inrmc[,9])*1000,
			hoehe_datum=as.numeric(inrmc[,11])*1000,
			fix_epoch=parSapply(cl,1:nrow(inrmc),function(i){
				rmct<-inrmc[i,1]
				if(nchar(rmct)<6)rmct<-paste0(paste0(rep(0,6-nchar(rmct)),collapse=""),rmct)
				rmcd<-format(strptime(inrmc[i,"epoch"],format="%s"),"%d%m%y")
				if(nchar(rmcd)<6)rmcd<-c(rep(0,6-nchar(rmcd)),rmcd)
				fixdat<-NA
				try(fixdat<-strptime(paste(rmcd,rmct,sep=""),format="%d%m%y%H%M%S",tz="GMT"))
				return(as.numeric(fixdat))
			})
		,stringsAsFactors=FALSE)
		ggadf<-ggadf[(ggadf$breite<=5400000000)&(ggadf$laenge<=10800000000),]
		dbCatchWriteTable(con,"gpsgga",ggadf,nfr=1000)

		rm(inrmc,ggadf,rmcin)
		gc()
	}
}

rmcin<-dbGetQuery(con,"select uid,cast(id as char) as id,cast(epoch as char) as epoch,telegram,payload from gpsin where telegram like '%GSV' and raw=1;")
if(nrow(rmcin)>0){
#	rmcin$id<-as.int64(rmcin$id)
#	rmcin$epoch<-as.int64(rmcin$epoch)

	sid<-strsplit(rmcin$payload,",")
	maxcol<-max(sapply(sid,length))
	clusterExport(cl,c("maxcol"))
	inrmc<-as.data.frame(matrix(unlist(lapply(sid,function(i){out<-i;if(length(i)<maxcol){out<-c(i,rep(NA,maxcol-length(i)));};return(out)})),ncol=maxcol,nrow=length(sid),byrow=TRUE),stringsAsFactors=FALSE)
	inrmc$id<-rmcin$id
	inrmc$uid<-rmcin$uid
	inrmc$epoch<-rmcin$epoch
	inrmc$telegram<-rmcin$telegram
	
	col_tel<-0
	for(i in col_tel+1:3)inrmc[,i]<-as.numeric(inrmc[,i])
	inrmc<-inrmc[!is.na(inrmc[,col_tel+1])&!is.na(inrmc[,col_tel+2])&!is.na(inrmc[,col_tel+3]),]
	inrmc<-inrmc[(inrmc[,col_tel+1]<=6)&(inrmc[,col_tel+2]<=6)&(inrmc[,col_tel+3]>0),]
	inrmc<-inrmc[rowSums(!is.na(inrmc[,col_tel+4:16]))>0,]
	if(nrow(inrmc)>0){
		inrmc$system<-substr(inrmc$telegram,2,2)

		gsvdf<-foreach(i=0:3,.combine=rbind,.inorder=FALSE,.multicombine=TRUE)%dopar%{
			out<-inrmc[,i*4+0:3+4]
			names(out)<-c("prn","elevation","azimuth","snr")
			out<-cbind(out,inrmc[,c("uid","id","epoch","telegram","system")])
			out$folge<-i+1
			out$row<-inrmc[,2]-1
			return(out)
		}
		gsvdf<-gsvdf[!is.na(gsvdf$prn),]
		gsvdf$folge<-gsvdf$folge+gsvdf$row*4
		gsvdf$id<-as.int64(gsvdf$id)
		gsvdf$epoch<-as.int64(gsvdf$epoch)
		gsvdf<-gsvdf[,c("uid","id","epoch","system","folge","prn","elevation","azimuth","snr")]
		dbCatchWriteTable(con,"gpsgsv",gsvdf,nfr=1000)
			
		rm(inrmc,rmcin,gsvdf)
		gc()
	}
}

rmcin<-dbGetQuery(con,"select uid,cast(id as char) as id,cast(epoch as char) as epoch,telegram,payload from gpsin where telegram like '%GSA' and raw=1;")
if(nrow(rmcin)>0){
	rmcin$id<-as.int64(rmcin$id)
	rmcin$epoch<-as.int64(rmcin$epoch)

	sid<-strsplit(rmcin$payload,",")
	maxcol<-max(sapply(sid,length))
	clusterExport(cl,c("maxcol"))
	inrmc<-as.data.frame(matrix(unlist(lapply(sid,function(i){out<-i;if(length(i)<maxcol){out<-c(i,rep(NA,maxcol-length(i)));};return(out)})),ncol=maxcol,nrow=length(sid),byrow=TRUE),stringsAsFactors=FALSE)
	inrmc$id<-rmcin$id
	inrmc$uid<-rmcin$uid
	inrmc$epoch<-rmcin$epoch
	inrmc$telegram<-rmcin$telegram
	
	col_tel<-0
	if(nrow(inrmc)>0){
		gsadf<-data.frame(uid=inrmc$uid,epoch=inrmc$epoch,id=inrmc$id,
		pdop=as.numeric(inrmc[,col_tel+15]),hdop=as.numeric(inrmc[,col_tel+16]),vdop=as.numeric(inrmc[,col_tel+17]),
		system=substr(inrmc$telegram,2,2),
			sats=sapply(1:nrow(inrmc),function(i){
				tg<-inrmc[i,col_tel+3:14]
				tg<-as.numeric(tg[(nchar(tg)>0)&!is.na(tg)])
				tg<-tg[!is.na(tg)]
				tg<-sort(tg)
				return(paste(tg,collapse="_"))
			})
			,stringsAsFactors=FALSE)
		gsadf<-gsadf[nchar(gsadf$sats)>0,]
		dbCatchWriteTable(con,"gpsgsa",gsadf)
		
		rm(inrmc,rmcin,gsadf)
		gc()
	}
}

rmcin<-dbGetQuery(con,"select uid,cast(id as char) as id,cast(epoch as char) as epoch,telegram,payload from gpsin where telegram like '%GBS' and raw=1;")
if(nrow(rmcin)>0){
	rmcin$id<-as.int64(rmcin$id)
	rmcin$epoch<-as.int64(rmcin$epoch)

	sid<-strsplit(rmcin$payload,",")
	maxcol<-max(sapply(sid,length))
	clusterExport(cl,c("maxcol"))
	inrmc<-as.data.frame(matrix(unlist(lapply(sid,function(i){out<-i;if(length(i)<maxcol){out<-c(i,rep(NA,maxcol-length(i)));};return(out)})),ncol=maxcol,nrow=length(sid),byrow=TRUE),stringsAsFactors=FALSE)
	inrmc$id<-rmcin$id
	inrmc$uid<-rmcin$uid
	inrmc$epoch<-rmcin$epoch
	inrmc$telegram<-rmcin$telegram
	col_tel<-0
	gbsdf<-data.frame(uid=inrmc$uid,epoch=inrmc$epoch,id=inrmc$id,
	system=substr(inrmc$telegram,2,2),lat_err=as.numeric(inrmc[,col_tel+2]),lon_err=as.numeric(inrmc[,col_tel+3]),
	alt_err=as.numeric(inrmc[,col_tel+4]),stringsAsFactors=FALSE)
	gbsdf<-gbsdf[rowSums(is.na(gbsdf),na.rm=TRUE)<4,]
	dbCatchWriteTable(con,"gpsgbs",gbsdf)

	rm(gbsdf,inrmc,rmcin)
	gc()
}

rmcin<-dbGetQuery(con,"select uid,cast(id as char) as id,cast(epoch as char) as epoch,telegram,payload from gpsin where telegram like '%GRS' and raw=1;")
if(nrow(rmcin)>0){
	rmcin$id<-as.int64(rmcin$id)
	rmcin$epoch<-as.int64(rmcin$epoch)

	sid<-strsplit(rmcin$payload,",")
	maxcol<-max(sapply(sid,length))
	clusterExport(cl,c("maxcol"))
	inrmc<-as.data.frame(matrix(unlist(lapply(sid,function(i){out<-i;if(length(i)<maxcol){out<-c(i,rep(NA,maxcol-length(i)));};return(out)})),ncol=maxcol,nrow=length(sid),byrow=TRUE),stringsAsFactors=FALSE)
	inrmc$id<-rmcin$id
	inrmc$uid<-rmcin$uid
	inrmc$epoch<-rmcin$epoch
	inrmc$telegram<-rmcin$telegram
	col_tel<-0
	if(nrow(inrmc)>0){
		grsdf<-data.frame(uid=inrmc$uid,epoch=inrmc$epoch,id=inrmc$id,
		system=substr(inrmc$telegram,2,2),fix=as.numeric(inrmc[,col_tel+2]),fgga=as.numeric(inrmc[,col_tel+1]),stringsAsFactors=FALSE)
		for(ig in 1:12)grsdf[,paste("S",ig,sep="")]<-as.numeric(inrmc[,2+ig])
		grsdf<-grsdf[rowSums(is.na(grsdf),na.rm=TRUE)<10,]
		dbCatchWriteTable(con,"gpsgrs",grsdf)

		rm(grsdf,inrmc,rmcin)
		gc()
	}
}

rmcin<-dbGetQuery(con,"select uid,cast(id as char) as id,cast(epoch as char) as epoch,telegram,payload from gpsin where telegram like '%GST' and raw=1;")
if(nrow(rmcin)>0){
	rmcin$id<-as.int64(rmcin$id)
	rmcin$epoch<-as.int64(rmcin$epoch)

	sid<-strsplit(rmcin$payload,",")
	maxcol<-max(sapply(sid,length))
	clusterExport(cl,c("maxcol"))
	inrmc<-as.data.frame(matrix(unlist(lapply(sid,function(i){out<-i;if(length(i)<maxcol){out<-c(i,rep(NA,maxcol-length(i)));};return(out)})),ncol=maxcol,nrow=length(sid),byrow=TRUE),stringsAsFactors=FALSE)
	inrmc$id<-rmcin$id
	inrmc$uid<-rmcin$uid
	inrmc$epoch<-rmcin$epoch
	inrmc$telegram<-rmcin$telegram
	col_tel<-0
	gstdf<-data.frame(uid=inrmc$uid,epoch=inrmc$epoch,id=inrmc$id,
	system=substr(inrmc$telegram,2,2),
		rms=as.numeric(inrmc[,col_tel+2]),lat_sigma=as.numeric(inrmc[,col_tel+6]),lon_sigma=as.numeric(inrmc[,col_tel+7]),
		height_sigma=as.numeric(inrmc[,col_tel+8])
		,stringsAsFactors=FALSE)
	gstdf<-gstdf[!is.na(gstdf$rms),]
	dbCatchWriteTable(con,"gpsgst",gstdf)

	rm(gstdf,inrmc,rmcin)
	gc()
}

rmcin<-dbGetQuery(con,"select uid,cast(id as char) as id,cast(epoch as char) as epoch,telegram,payload from gpsin where telegram like '%VTG' and raw=1;")
if(nrow(rmcin)>0){
	rmcin$id<-as.int64(rmcin$id)
	rmcin$epoch<-as.int64(rmcin$epoch)

	sid<-strsplit(rmcin$payload,",")
	maxcol<-max(sapply(sid,length))
	clusterExport(cl,c("maxcol"))
	inrmc<-as.data.frame(matrix(unlist(lapply(sid,function(i){out<-i;if(length(i)<maxcol){out<-c(i,rep(NA,maxcol-length(i)));};return(out)})),ncol=maxcol,nrow=length(sid),byrow=TRUE),stringsAsFactors=FALSE)
	inrmc$id<-rmcin$id
	inrmc$uid<-rmcin$uid
	inrmc$epoch<-rmcin$epoch
	inrmc$telegram<-rmcin$telegram
	col_tel<-0
	vtgdf<-data.frame(uid=inrmc$uid,epoch=inrmc$epoch,id=inrmc$id,
	system=substr(inrmc$telegram,2,2),
		course_true=as.numeric(inrmc[,col_tel+1]),course_mag=as.numeric(inrmc[,col_tel+3]),speed_kn=as.numeric(inrmc[,col_tel+5])*1000,
		speed_kmh=as.numeric(inrmc[,col_tel+7])*1000
		,stringsAsFactors=FALSE)
	dbCatchWriteTable(con,"gpsvtg",vtgdf)

	rm(vtgdf,inrmc,rmcin)
	gc()
}


rmcin<-dbGetQuery(con,"select uid,cast(id as char) as id,cast(epoch as char) as epoch,telegram,payload from gpsin where telegram like '%GNS' and raw=1;")
if(nrow(rmcin)>0){
	rmcin$id<-as.int64(rmcin$id)
	rmcin$epoch<-as.int64(rmcin$epoch)

	sid<-strsplit(rmcin$payload,",")
	maxcol<-max(sapply(sid,length))
	clusterExport(cl,c("maxcol"))
	inrmc<-as.data.frame(matrix(unlist(lapply(sid,function(i){out<-i;if(length(i)<maxcol){out<-c(i,rep(NA,maxcol-length(i)));};return(out)})),ncol=maxcol,nrow=length(sid),byrow=TRUE),stringsAsFactors=FALSE)
	inrmc$id<-rmcin$id
	inrmc$uid<-rmcin$uid
	inrmc$epoch<-rmcin$epoch
	inrmc$telegram<-rmcin$telegram
	col_tel<-0
	gnsdf<-data.frame(uid=inrmc$uid,epoch=inrmc$epoch,id=inrmc$id,
		breite=txt2bl(inrmc[,col_tel+2]),laenge=txt2bl(inrmc[,col_tel+4]),mode=inrmc[,col_tel+6],sats=inrmc[,col_tel+7],hdop=as.numeric(inrmc[,col_tel+8]),
		hoehe=as.numeric(inrmc[,col_tel+9])*1000,hoehe_datum=as.numeric(inrmc[,col_tel+10])*1000
		,stringsAsFactors=FALSE)
	dbCatchWriteTable(con,"gpsgns",gnsdf,nfr=1000)

	rm(gnsdf,inrmc,rmcin)
	gc()
}		

dbSendStatement(con,"delete from gpsin where raw=1;")
dbDisconnect(con)
